跳到主要内容

gRPC 客户端

这篇分析是基于 grpc-go 1.48 这个版本来的

客户端 http2 流程

20220718145730

发起 RPC 调用

调用生成 go 代理,创建一个客户端 NewGoodbyeClient,然后发起 RPC 调用

func (c *goodbyeClient) SayGoodbye(ctx context.Context, in *GoodbyeRequest, opts ...grpc.CallOption) (*GoodbyeResponse, error) {
out := new(GoodbyeResponse)
err := c.cc.Invoke(ctx, "/goodbye_server.Goodbye/SayGoodbye", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}

可以发现底层调用的还是这个 Invoke 方法,它执行的是一个 ClientConnInterface 接口,其有如下两个接口

type ClientConnInterface interface {
// 发送一元请求
Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error
// 创建流式请求
NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
}

它调用的是 call.go 文件实现的调用方法

func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
//把客户端的拦截器和和调用方法入参的拦截器合并(就是放到一个切片中)
opts = combine(cc.dopts.callOptions, opts)
//拦截器不为空就调用这个方法
if cc.dopts.unaryInt != nil {
//拦截器中还是会调用invoke方法(也就是下面的方法)
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
}
return invoke(ctx, method, args, reply, cc, opts...)
}
// ...

// 这个就是一个流传输的规范,不用管
var unaryStreamDesc = &StreamDesc{ServerStreams: false, ClientStreams: false}

func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
// 创建一个传输使用的客户端,因为流式请求也是这个 newClientStream 客户端,所以使用 unaryStreamDesc 标识请求类型
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
// 发消息,cs 是 grpc.clientStream 对象,调用 clientStream 的 SendMsg 方法
if err := cs.SendMsg(req); err != nil {
return err
}
// 收消息,cs 是 grpc.clientStream 对象,调用 clientStream 的 RecvMsg 方法
return cs.RecvMsg(reply)
}

在 invoke 方法中主要做了如下如下事情

1、创建客户端流对象,这个方法中主要初始化一些流对象参数,比如超时时间,发送最大消息大小,接受最大消息大小

2、发送请求

3、接受服务端响应

发送请求

func (cs *clientStream) SendMsg(m interface{}) (err error) {
// 序列化数据
hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
if err != nil {
return err
}


// ...
op := func(a *csAttempt) error {
err := a.sendMsg(m, hdr, payload, data)
m, data = nil, nil
return err
}
// ...

//开始执行发送,带重试功能
err = cs.withRetry(op,
func() {
cs.bufferForRetryLocked(len(hdr)+len(payload), op)
}
)
return
}
func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
// ...
if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
if !cs.desc.ClientStreams {
return nil
}
return io.EOF
}
// ...
}

最终是通过 a.t.Write 发出的数据写操作,a.t 是一个 ClientTransport 类型,所以最终是通过 ClientTransport 这个结构体的 Write 方法发送数据

接受响应

RecvMsg

func (cs *clientStream) RecvMsg(m interface{}) error {
// ...
err := cs.withRetry(func(a *csAttempt) error {
return a.recvMsg(m, recvInfo)
}, cs.commitAttemptLocked)
// ...
}

recvMsg

func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
// ...
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
// ...
if a.statsHandler != nil {
a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
Client: true,
RecvTime: time.Now(),
Payload: m,
Data: payInfo.uncompressedBytes,
WireLength: payInfo.wireLength,
Length: len(payInfo.uncompressedBytes),
})
}
// ...
}

recv

func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
// ...
}

recvAndDecompress

func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) {
pf, d, err := p.recvMsg(maxReceiveMessageSize)
// ...
}

recvMsg

func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) {
if _, err := p.r.Read(p.header[:]); err != nil {
return 0, nil, err
}
// ...
}

最终还是调用了 p.r.Read 方法,p.r 是一个 io.Reader 接口, s *transport.Stream类型

References